TalendからPostgreSQLにCSVデータをインポートする
はじめに
DI部のおおたきです。Talendを使ってPostgreSQLにCSVファイルのインポートをやってみました。
Redshiftはよく使うのですがPostgreSQLはあまり触ったことがなく、どうやってCSVをインポートするのかと調べたら、Redshiftと同様にCOPYコマンドでインポートが出来ると知り、RedshiftのベースがPostgreSQLだからまぁ当たり前かと納得してました。
環境
Talendで作ってみる
では早速実装してみます。TalendでPostgreSQLにCSVインポートするには「tPostgresqlOutputBulkExec」というコンポーネントがあるのでこちらを使ってみました。設定項目としては以下のようになっています。
見て分かるようにDBの接続情報とファイル名の設定項目があります。ファイル名は変換処理したファイルを出力するパスを設定します。出力した後にこのファイルをPostgreSQLにインポートします。
ジョブ全体としては以下のようになります。
ジョブについて簡単に説明します。まず、tFileInputDelimitedコンポーネントでCSVファイルを読み込みます。次にtMapを使ってCSVデータに変換処理をしています。ここでは読み込んだCSVデータに対してID列を追加しています。
最後にtPostgresqlOutputBulkExecで変換処理したCSVファイルを出力し、そのファイルをインポートします。まずは自分のPCにインストールされているPostgreSQLに登録してみました。
読み込んだCSVファイル
name,email hoge,hoge@test.com fuga,fuga@test.com ・ ・ ・
変換処理をしたCSVファイル
1,hoge,hoge@test.com 2,fuga,fuga@test.com ・ ・ ・
1000件ほどのデータでしたが一瞬で登録ができました。
RDSのPostgreSQLにインポートしてみた
先ほどは自分のPCにインストールしてあるPostgreSQLでしたが今度はAWS上に作成したRDS(PostgresSQL)に対して登録してみました。(tPostgresqlOutputBulkExecのDB接続先のみ変更) 実行したところ、以下のエラーが発生してしまいました。
org.postgresql.util.PSQLException: ERROR: must be superuser to COPY to or from a file
superuser権限がないとダメと書かれているので、権限を付与して再度実行。しかしまた同じエラーが発生。
そこでドキュメントを読んでみると、以下のように書いてあります。
COPYコマンドで指定するファイルは、クライアントアプリケーションではなく、サーバが直接読み込み/書き込みを行います。 したがって、それらのファイルは、クライアントではなく、データベースサーバマシン上に存在するか、または、データベースサーバマシンからアクセス可能である必要があります。
なるほど、RDSに対して登録できないわけです。じゃ、RDSに対してCSVインポートはできないのかと思って調べたら、AWSのドキュメントに登録方法が書いてありました。
\copy コマンドを使用して PostgreSQL DB インスタンス上のテーブルにデータをインポートする psql プロンプトから \copy コマンドを実行して、PostgreSQL DB インスタンス上のテーブルにデータをインポートできます。DB インスタンスには、テーブルがあらかじめ存在している必要があります。\copy コマンドの詳細については、PostgreSQL ドキュメントを参照してください。
ただpsqlコマンドからの実行になるので、Talendから実行すのはちょっと不便です。さらに調べるとJavaでCOPYをするならJDBCにCopyManagerというクラスがあることが判明。(APIドキュメントに記載)
なのでこれを使ってTalendからCSVインポートをできるように修正してみます。
ルーチンを作成する
CopyManagerを呼び出すルーチンPostgresqlUtilを作成します。ルーチン内でコネクションも生成しようかと思ったのですが、コンポーネントで作成したコネクションを渡した方が使い勝手が良さそうなので、引数にコネクションを設定しています。
package routines; import java.io.FileInputStream; import java.io.InputStreamReader; import java.io.Reader; import java.sql.Connection; import org.postgresql.copy.CopyManager; import org.postgresql.core.BaseConnection; public class PostgresqlUtil { public static long copy(Connection conn, String filePath, String tableName) throws Exception { CopyManager copyManager = new CopyManager((BaseConnection) conn); Reader reader = new InputStreamReader(new FileInputStream(filePath), "UTF8"); String sql = "copy " + tableName + " FROM STDIN WITH DELIMITER ','"; long result = copyManager.copyIn(sql, reader); reader.close(); return result; } }
※ルーチンを作成したら、PostgreSqlのjdbcドライバをダウンロードして外部ライブラリのインポートの設定をしてください。
次に、ジョブを修正します。tPostgresqlOutputBulkExecコンポーネントは使えないので削除し、tFileOutputDelimitedでファイル出力するようにします。次にtPostgresqlConnectionコンポーネントでコネクション生成し、tJavaコンポーネントでルーチンにコネクションを渡すようにします。
修正後のジョブです。
tJava内のコードで先ほど作成したルーチンを呼び出しています。
Connection conn = (Connection)globalMap.get("conn_tPostgresqlConnection_1"); long result = PostgresqlUtil.copy(conn, (String)globalMap.get("tFileOutputDelimited_1_FILE_NAME"), "public.user"); System.out.println(result);
実行したら問題なく登録ができました!
まとめ
いかがでしたでしょうか。RDS(Postgresとジョブが同一環境にない)の場合はコンポーネントが使えず、少し不便ですが一度ルーチンを作ってしまえば割と簡単にCSVファイルのインポートが出来るのではないでしょうか。できればコンポーネントでその辺も対応してくれと助かりますが。。。
今回は以上です。